/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package wanda.util;


import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import wanda.exception.HoodieException;
import wanda.exception.HoodieIOException;
import wanda.exception.InvalidHoodiePathException;

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


/**
 * Utility functions related to accessing the file storage
 */
public class FSUtils {

  private static final Logger LOG = LogManager.getLogger(FSUtils.class);
  // Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1
  private static final Pattern LOG_FILE_PATTERN =
      Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)))?");
  private static final String LOG_FILE_PREFIX = ".";
  private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;

  private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_";

  private static final PathFilter ALLOW_ALL_FILTER = new PathFilter() {
    @Override
    public boolean accept(Path file) {
      return true;
    }
  };

  public static Configuration prepareHadoopConf(Configuration conf) {
    conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
    conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());

    // look for all properties, prefixed to be picked up
    for (Entry<String, String> prop : System.getenv().entrySet()) {
      if (prop.getKey().startsWith(HOODIE_ENV_PROPS_PREFIX)) {
        LOG.info("Picking up value for hoodie env var :" + prop.getKey());
        conf.set(prop.getKey()
                .replace(HOODIE_ENV_PROPS_PREFIX, "")
                .replaceAll("_DOT_", "."),
            prop.getValue());
      }
    }
    return conf;
  }

  public static FileSystem getFs(String path, Configuration conf) {
    FileSystem fs;
    conf = prepareHadoopConf(conf);
    try {
      fs = new Path(path).getFileSystem(conf);
    } catch (IOException e) {
      throw new HoodieIOException("Failed to get instance of " + FileSystem.class.getName(),
          e);
    }
    LOG.info(
        String.format("Hadoop Configuration: fs.defaultFS: [%s], Config:[%s], FileSystem: [%s]",
            conf.getRaw("fs.defaultFS"), conf.toString(), fs.toString()));
    return fs;
  }

  /**
   * A write token uniquely identifies an attempt at one of the IOHandle operations (Merge/Create/Append)
   */
  public static String makeWriteToken(int taskPartitionId, int stageId, long taskAttemptId) {
    return String.format("%d-%d-%d", taskPartitionId, stageId, taskAttemptId);
  }


  public static String makeDataFileName(String commitTime, String writeToken, String fileId) {
    return String.format("%s_%s_%s.parquet", fileId, writeToken, commitTime);
  }





  public static String getCommitFromCommitFile(String commitFileName) {
    return commitFileName.split("\\.")[0];
  }

  public static String getCommitTime(String fullFileName) {
    return fullFileName.split("_")[2].split("\\.")[0];
  }

  public static long getFileSize(FileSystem fs, Path path) throws IOException {
    return fs.getFileStatus(path).getLen();
  }

  public static String getFileId(String fullFileName) {
    return fullFileName.split("_")[0];
  }


  /**
   * Given a base partition and a partition path, return
   * relative path of partition path to the base path
   */
  public static String getRelativePartitionPath(Path basePath, Path partitionPath) {
    basePath = Path.getPathWithoutSchemeAndAuthority(basePath);
    partitionPath = Path.getPathWithoutSchemeAndAuthority(partitionPath);
    String partitionFullPath = partitionPath.toString();
    int partitionStartIndex = partitionFullPath.indexOf(
        basePath.getName(),
        basePath.getParent() == null ? 0 : basePath.getParent().toString().length());
    // Partition-Path could be empty for non-partitioned tables
    return partitionStartIndex + basePath.getName().length() == partitionFullPath.length() ? "" :
        partitionFullPath.substring(partitionStartIndex + basePath.getName().length() + 1);
  }



  public static String getFileExtension(String fullName) {
    Preconditions.checkNotNull(fullName);
    String fileName = (new File(fullName)).getName();
    int dotIndex = fileName.indexOf('.');
    return dotIndex == -1 ? "" : fileName.substring(dotIndex);
  }



  public static String getInstantTime(String name) {
    return name.replace(getFileExtension(name), "");
  }


  /**
   * Get the first part of the file name in the log file. That will be the fileId. Log file do not
   * have commitTime in the file name.
   */
  public static String getBaseCommitTimeFromLogPath(Path path) {
    Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
    if (!matcher.find()) {
      throw new InvalidHoodiePathException(path, "LogFile");
    }
    return matcher.group(2);
  }

  /**
   * Get TaskId used in log-path
   */
  public static Integer getTaskPartitionIdFromLogPath(Path path) {
    Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
    if (!matcher.find()) {
      throw new InvalidHoodiePathException(path, "LogFile");
    }
    String val = matcher.group(7);
    return val == null ? null : Integer.parseInt(val);
  }


  public static String makeLogFileName(String fileId, String logFileExtension,
      String baseCommitTime, int version, String writeToken) {
    String suffix = (writeToken == null) ? String.format("%s_%s%s.%d",fileId, baseCommitTime, logFileExtension, version)
        : String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, logFileExtension, version, writeToken);
    return LOG_FILE_PREFIX + suffix;
  }

  public static boolean isLogFile(Path logPath) {
    Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
    if (!matcher.find()) {
      return false;
    }
    return true;
  }



  public static int getDefaultBufferSize(final FileSystem fs) {
    return fs.getConf().getInt("io.file.buffer.size", 4096);
  }

  public static Short getDefaultReplication(FileSystem fs, Path path) {
    return fs.getDefaultReplication(path);
  }

  /**
   * When a file was opened and the task died without closing the stream, another task executor
   * cannot open because the existing lease will be active. We will try to recover the lease, from
   * HDFS. If a data node went down, it takes about 10 minutes for the lease to be rocovered. But if
   * the client dies, this should be instant.
   */
  public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p)
      throws IOException, InterruptedException {
    LOG.info("Recover lease on dfs file " + p);
    // initiate the recovery
    boolean recovered = false;
    for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; nbAttempt++) {
      LOG.info("Attempt " + nbAttempt + " to recover lease on dfs file " + p);
      recovered = dfs.recoverLease(p);
      if (recovered) {
        break;
      }
      // Sleep for 1 second before trying again. Typically it takes about 2-3 seconds to recover
      // under default settings
      Thread.sleep(1000);
    }
    return recovered;
  }



  public static void createPathIfNotExists(FileSystem fs, Path partitionPath) throws IOException {
    if (!fs.exists(partitionPath)) {
      fs.mkdirs(partitionPath);
    }
  }

  public static Long getSizeInMB(long sizeInBytes) {
    return sizeInBytes / (1024 * 1024);
  }

  public static Path getPartitionPath(String basePath, String partitionPath) {
    return getPartitionPath(new Path(basePath), partitionPath);
  }

  public static Path getPartitionPath(Path basePath, String partitionPath) {
    // FOr non-partitioned table, return only base-path
    return ((partitionPath == null) || (partitionPath.isEmpty())) ? basePath :
        new Path(basePath, partitionPath);
  }
}
